Introducing: Map

Map can be used to easily achieve parallelism and we have already covered it in functional programming. For those who have not read it; It is a function which maps another function over a sequence

It basically provides kind of parallelism by calling the requested function over all elements in a list/array or in other words, Map applies a function to all the items in the given list and returns a new list.

It takes a function and a collection of items as parameters and makes a new, empty collection, runs the function on each item in the original collection and inserts each return value into the new collection. It then returns the updated collection.

This is a simple map that takes a list of names and returns a list of the lengths of those names


In [1]:
names =  ("Manish", "Aalok", "Mayank","Durga")

lst = tuple(map(len, names))
print(lst)

# This is a map that squares every number in the passed collection:
power = map(lambda x: x*x, lst)
print(list(power))


(6, 5, 6, 5)
[36, 25, 36, 25]

Multiprocessing


In [3]:
from multiprocessing import Pool
from multiprocessing.dummy import Pool as ThreadPool 

# Initialize the pool, uses 
pool = ThreadPool()

In [4]:
import multiprocessing
multiprocessing.cpu_count()


Out[4]:
2

In [5]:
# Sets the pool size to 4, 
# Play around the value till you get the most optimised value
# in this case ThreadPool() is equivalent to ThreadPool(4)
pool = ThreadPool(4)

In [8]:
from urllib.request import urlopen
from multiprocessing.dummy import Pool as ThreadPool 

urls = [
  'http://www.python.org', 
  'http://www.python.org/about/',
  'http://www.onlamp.com/pub/a/python/2003/04/17/metaclasses.html',
  'http://www.python.org/doc/',
  'http://www.python.org/download/',
  'http://www.python.org/getit/',
  'http://www.python.org/community/',
  'https://wiki.python.org/moin/',
  'http://planet.python.org/',
  'https://wiki.python.org/moin/LocalUserGroups',
  'http://www.python.org/psf/',
  'http://docs.python.org/devguide/',
  'http://www.python.org/community/awards/'
  ]

# Make the Pool of workers
pool = ThreadPool(10) 
# Open the urls in their own threads
# and return the results
results = pool.map(urlopen, urls)
#close the pool and wait for the work to finish 
pool.close() 
pool.join() 
print(results)


---------------------------------------------------------------------------
SSLEOFError                               Traceback (most recent call last)
e:\apps\python36\lib\urllib\request.py in do_open(self, http_class, req, **http_conn_args)
   1317                 h.request(req.get_method(), req.selector, req.data, headers,
-> 1318                           encode_chunked=req.has_header('Transfer-encoding'))
   1319             except OSError as err: # timeout error

e:\apps\python36\lib\http\client.py in request(self, method, url, body, headers, encode_chunked)
   1238         """Send a complete request to the server."""
-> 1239         self._send_request(method, url, body, headers, encode_chunked)
   1240 

e:\apps\python36\lib\http\client.py in _send_request(self, method, url, body, headers, encode_chunked)
   1284             body = _encode(body, 'body')
-> 1285         self.endheaders(body, encode_chunked=encode_chunked)
   1286 

e:\apps\python36\lib\http\client.py in endheaders(self, message_body, encode_chunked)
   1233             raise CannotSendHeader()
-> 1234         self._send_output(message_body, encode_chunked=encode_chunked)
   1235 

e:\apps\python36\lib\http\client.py in _send_output(self, message_body, encode_chunked)
   1025         del self._buffer[:]
-> 1026         self.send(msg)
   1027 

e:\apps\python36\lib\http\client.py in send(self, data)
    963             if self.auto_open:
--> 964                 self.connect()
    965             else:

e:\apps\python36\lib\http\client.py in connect(self)
   1399             self.sock = self._context.wrap_socket(self.sock,
-> 1400                                                   server_hostname=server_hostname)
   1401             if not self._context.check_hostname and self._check_hostname:

e:\apps\python36\lib\ssl.py in wrap_socket(self, sock, server_side, do_handshake_on_connect, suppress_ragged_eofs, server_hostname, session)
    406                          server_hostname=server_hostname,
--> 407                          _context=self, _session=session)
    408 

e:\apps\python36\lib\ssl.py in __init__(self, sock, keyfile, certfile, server_side, cert_reqs, ssl_version, ca_certs, do_handshake_on_connect, family, type, proto, fileno, suppress_ragged_eofs, npn_protocols, ciphers, server_hostname, _context, _session)
    813                         raise ValueError("do_handshake_on_connect should not be specified for non-blocking sockets")
--> 814                     self.do_handshake()
    815 

e:\apps\python36\lib\ssl.py in do_handshake(self, block)
   1067                 self.settimeout(None)
-> 1068             self._sslobj.do_handshake()
   1069         finally:

e:\apps\python36\lib\ssl.py in do_handshake(self)
    688         """Start the SSL/TLS handshake."""
--> 689         self._sslobj.do_handshake()
    690         if self.context.check_hostname:

SSLEOFError: EOF occurred in violation of protocol (_ssl.c:777)

During handling of the above exception, another exception occurred:

URLError                                  Traceback (most recent call last)
<ipython-input-8-0fb52b033803> in <module>()
     22 # Open the urls in their own threads
     23 # and return the results
---> 24 results = pool.map(urlopen, urls)
     25 #close the pool and wait for the work to finish
     26 pool.close()

e:\apps\python36\lib\multiprocessing\pool.py in map(self, func, iterable, chunksize)
    264         in a list that is returned.
    265         '''
--> 266         return self._map_async(func, iterable, mapstar, chunksize).get()
    267 
    268     def starmap(self, func, iterable, chunksize=None):

e:\apps\python36\lib\multiprocessing\pool.py in get(self, timeout)
    642             return self._value
    643         else:
--> 644             raise self._value
    645 
    646     def _set(self, i, obj):

e:\apps\python36\lib\multiprocessing\pool.py in worker(inqueue, outqueue, initializer, initargs, maxtasks, wrap_exception)
    117         job, i, func, args, kwds = task
    118         try:
--> 119             result = (True, func(*args, **kwds))
    120         except Exception as e:
    121             if wrap_exception and func is not _helper_reraises_exception:

e:\apps\python36\lib\multiprocessing\pool.py in mapstar(args)
     42 
     43 def mapstar(args):
---> 44     return list(map(*args))
     45 
     46 def starmapstar(args):

e:\apps\python36\lib\urllib\request.py in urlopen(url, data, timeout, cafile, capath, cadefault, context)
    221     else:
    222         opener = _opener
--> 223     return opener.open(url, data, timeout)
    224 
    225 def install_opener(opener):

e:\apps\python36\lib\urllib\request.py in open(self, fullurl, data, timeout)
    524             req = meth(req)
    525 
--> 526         response = self._open(req, data)
    527 
    528         # post-process response

e:\apps\python36\lib\urllib\request.py in _open(self, req, data)
    542         protocol = req.type
    543         result = self._call_chain(self.handle_open, protocol, protocol +
--> 544                                   '_open', req)
    545         if result:
    546             return result

e:\apps\python36\lib\urllib\request.py in _call_chain(self, chain, kind, meth_name, *args)
    502         for handler in handlers:
    503             func = getattr(handler, meth_name)
--> 504             result = func(*args)
    505             if result is not None:
    506                 return result

e:\apps\python36\lib\urllib\request.py in https_open(self, req)
   1359         def https_open(self, req):
   1360             return self.do_open(http.client.HTTPSConnection, req,
-> 1361                 context=self._context, check_hostname=self._check_hostname)
   1362 
   1363         https_request = AbstractHTTPHandler.do_request_

e:\apps\python36\lib\urllib\request.py in do_open(self, http_class, req, **http_conn_args)
   1318                           encode_chunked=req.has_header('Transfer-encoding'))
   1319             except OSError as err: # timeout error
-> 1320                 raise URLError(err)
   1321             r = h.getresponse()
   1322         except:

URLError: <urlopen error EOF occurred in violation of protocol (_ssl.c:777)>

In [24]:
%%time
def doubleMe(x):
    return x ** x

y = range(10000)

pool = ThreadPool(80) 
# Open the urls in their own threads
# and return the results
results = pool.map(doubleMe, y)
#close the pool and wait for the work to finish 
pool.close() 
pool.join() 
# print(results)


Wall time: 9.38 s

In [25]:
%%time
l = []
for a in y:
    l.append(doubleMe(a))


Wall time: 9.84 s

Communication between the processes

Running a computation in multiple processes requires some communication between these processes. One of the nice aspects of multiprocessing in Python is that most of the time you do not need to know how this communication is handled: it just works. However, it is useful to understand the basics of this mechanism in order to figure out how to solve two kinds of problems: unexpected errors, and bad performance.

Communication between processes takes the form of streams of bytes that travel through specific communication channels. To send an object from one process to another, Python has to convert it to a stream of bytes, and assemble the object back at the receiving end. Python's mechanism for doing these conversions was originally designed for storing objects in files and is implemented in the pickle module. Every argument that is passed to a Python function running in another process is pickled and then unpickled. The result of the function undergoes the same process on its way back.

There are two things you need to know about pickle in the context of multiprocessing. First, most objects can be pickled but some cannot. Second, pickling and unpickling take time and can sometimes add considerable overhead to your multiprocessing.

The objects that cannot be pickled come in two varieties: those for which pickling does not make sense, and those for which it has simply not been implemented. A good example for the first category is file objects. The second category contains mainly object types defined in extension modules whose authors didn't implement pickling. If you use an old release of NumPy, you may discover that its array-aware functions are not picklable, making it impossible to use such a function directly as a task in multiprocessing. For Python's built-in objects, there is one important restriction that is due to the implementation details of pickle: functions and classes can only be pickled if they are defined at the top level of a module. This means, for example, that if you define a function inside another function, you cannot pickle it and thus not pass it to a multiprocessing task.

The performance implications of pickling are rather obvious: you should try to pass as few arguments as possible to your tasks, and make sure you pass no more data than you really need to. For example, rather than passing a huge list and the index of the item that your taks is supposed to process, you should pass only that item.

Key Points

  • CPU multi-processing is a parallel programming technique that can harness the power of modern computers to help you perform more tasks more quickly.
  • The Python multiprocessing library allows you to create a pool of workers to carry out tasks in parallel
  • Tasks are easy to describe using Python functions
  • Care needs to be taken when executing code in parallel environments to avoid strange program behavior and wrong computations
  • You can combine results from individual tasks allowing each worker to share in the computational load
  • It is important to use profiling before optimizing computer programs
  • Metrics such as speedup and efficiency aid in evaluating the performance and utility of parallel programs

Threading Vs Parallelism

The threading module uses threads, the multiprocessing module uses processes. The difference is that threads run in the same memory space, while processes have separate memory. This makes it a bit harder to share objects between processes with multiprocessing. Since threads use the same memory, precautions have to be taken or two threads will write to the same memory at the same time. This is what the global interpreter lock is for.

Spawning processes is a bit slower than spawning threads. Once they are running, there is not much difference.

Pros

Threading Parallelism
Lightweight - low memory footprint Separate memory space
Shared memory - makes access to state from another context easier Code is usually straightforward
Allows you to easily make responsive UIs Takes advantage of multiple CPUs & cores
cPython C extension modules that properly release the GIL will run in parallel Avoids GIL limitations for cPython
Great option for I/O-bound applications Eliminates most needs for synchronization primitives unless if you use shared memory (instead, it's more of a communication model for IPC)
Child processes are interruptible/killable
Python multiprocessing module includes useful abstractions with an interface much like threading.Thread
A must with cPython for CPU-bound processing

Cons

Threading Parallelism
cPython - subject to the GIL IPC a little more complicated with more overhead (communication model vs. shared memory/objects)
Not interruptible/killable Larger memory footprint
If not following a command queue/message pump model (using the Queue module), then manual use of synchronization primitives become a necessity (decisions are needed for the granularity of locking)
Code is usually harder to understand and to get right - the potential for race conditions increases dramatically